home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2008 February / PCWFEB08.iso / Software / Freeware / Miro 1.0 / Miro_Installer.exe / Miro_Downloader.exe / eventloop.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2007-11-12  |  16.6 KB  |  532 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.5)
  3.  
  4. '''Event loop handler.
  5.  
  6. This module handles the democracy event loop which is responsible for network
  7. requests and scheduling.
  8.  
  9. TODO:
  10.     handle user setting clock back
  11. '''
  12. import threading
  13. import socket
  14. import errno
  15. import select
  16. import heapq
  17. import Queue
  18. import util
  19. import database
  20. import logging
  21. from clock import clock
  22. import util
  23. cumulative = { }
  24.  
  25. class DelayedCall(object):
  26.     
  27.     def __init__(self, function, name, args, kwargs):
  28.         self.function = function
  29.         self.name = name
  30.         self.args = args
  31.         self.kwargs = kwargs
  32.         self.canceled = False
  33.  
  34.     
  35.     def _unlink(self):
  36.         """Removes the references that this object has to the outside world,
  37.         this eases the GC's work in finding cycles and fixes some memory leaks
  38.         on windows.
  39.         """
  40.         self.function = None
  41.         self.args = None
  42.         self.kwargs = None
  43.  
  44.     
  45.     def cancel(self):
  46.         self.canceled = True
  47.         self._unlink()
  48.  
  49.     
  50.     def dispatch(self):
  51.         if not self.canceled:
  52.             when = 'While handling %s' % self.name
  53.             start = clock()
  54.             util.trapCall(when, self.function, *self.args, **self.kwargs)
  55.             end = clock()
  56.             if end - start > 0.5:
  57.                 logging.timing('%s too slow (%.3f secs)', self.name, end - start)
  58.             
  59.             
  60.             try:
  61.                 total = cumulative[self.name]
  62.             except KeyboardInterrupt:
  63.                 raise 
  64.             except:
  65.                 total = 0
  66.  
  67.             total += end - start
  68.             cumulative[self.name] = total
  69.             if total > 5:
  70.                 logging.timing('%s cumulative is too slow (%.3f secs)', self.name, total)
  71.                 cumulative[self.name] = 0
  72.             
  73.         
  74.         self._unlink()
  75.  
  76.  
  77.  
  78. class Scheduler(object):
  79.     
  80.     def __init__(self):
  81.         self.heap = []
  82.  
  83.     
  84.     def addTimeout(self, delay, function, name, args = None, kwargs = None):
  85.         if args is None:
  86.             args = ()
  87.         
  88.         if kwargs is None:
  89.             kwargs = { }
  90.         
  91.         scheduledTime = clock() + delay
  92.         dc = DelayedCall(function, 'timeout (%s)' % (name,), args, kwargs)
  93.         heapq.heappush(self.heap, (scheduledTime, dc))
  94.         return dc
  95.  
  96.     
  97.     def nextTimeout(self):
  98.         if len(self.heap) == 0:
  99.             return None
  100.         else:
  101.             return max(0, self.heap[0][0] - clock())
  102.  
  103.     
  104.     def hasPendingTimeout(self):
  105.         if len(self.heap) > 0:
  106.             pass
  107.         return self.heap[0][0] < clock()
  108.  
  109.     
  110.     def processNextTimeout(self):
  111.         (time, dc) = heapq.heappop(self.heap)
  112.         dc.dispatch()
  113.  
  114.     
  115.     def processTimeouts(self):
  116.         while self.hasPendingTimeout():
  117.             self.processNextTimeout()
  118.  
  119.  
  120.  
  121. class CallQueue(object):
  122.     
  123.     def __init__(self):
  124.         self.queue = Queue.Queue()
  125.  
  126.     
  127.     def addIdle(self, function, name, args = None, kwargs = None):
  128.         if args is None:
  129.             args = ()
  130.         
  131.         if kwargs is None:
  132.             kwargs = { }
  133.         
  134.         dc = DelayedCall(function, 'idle (%s)' % (name,), args, kwargs)
  135.         self.queue.put(dc)
  136.         return dc
  137.  
  138.     
  139.     def processNextIdle(self):
  140.         dc = self.queue.get()
  141.         dc.dispatch()
  142.  
  143.     
  144.     def hasPendingIdle(self):
  145.         return not self.queue.empty()
  146.  
  147.     
  148.     def processIdles(self):
  149.         while self.hasPendingIdle():
  150.             self.processNextIdle()
  151.  
  152.  
  153.  
  154. class ThreadPool(object):
  155.     """The thread pool is used to handle calls like gethostbyname() that block
  156.     and there's no asynchronous workaround.  What we do instead is call them
  157.     in a separate thread and return the result in a callback that executes in
  158.     the event loop.
  159.     """
  160.     THREADS = 3
  161.     
  162.     def __init__(self, eventLoop):
  163.         self.eventLoop = eventLoop
  164.         self.queue = Queue.Queue()
  165.         self.threads = []
  166.  
  167.     
  168.     def initThreads(self):
  169.         while len(self.threads) < ThreadPool.THREADS:
  170.             t = threading.Thread(name = 'ThreadPool - %d' % len(self.threads), target = self.threadLoop)
  171.             t.setDaemon(True)
  172.             t.start()
  173.             self.threads.append(t)
  174.  
  175.     
  176.     def threadLoop(self):
  177.         while True:
  178.             nextItem = self.queue.get()
  179.             if nextItem == 'QUIT':
  180.                 break
  181.             else:
  182.                 (callback, errback, func, name, args, kwargs) = nextItem
  183.             
  184.             try:
  185.                 result = func(*args, **kwargs)
  186.             except KeyboardInterrupt:
  187.                 raise 
  188.             except Exception:
  189.                 e = None
  190.                 func = errback
  191.                 name = 'Thread Pool Errback (%s)' % name
  192.                 args = (e,)
  193.  
  194.             func = callback
  195.             name = 'Thread Pool Callback (%s)' % name
  196.             args = (result,)
  197.             if not self.eventLoop.quitFlag:
  198.                 self.eventLoop.idleQueue.addIdle(func, name, args = args)
  199.                 self.eventLoop.wakeup()
  200.                 continue
  201.  
  202.     
  203.     def queueCall(self, callback, errback, function, name, *args, **kwargs):
  204.         self.queue.put((callback, errback, function, name, args, kwargs))
  205.  
  206.     
  207.     def closeThreads(self):
  208.         for x in xrange(len(self.threads)):
  209.             self.queue.put('QUIT')
  210.         
  211.         while len(self.threads) > 0:
  212.             x = self.threads.pop()
  213.             
  214.             try:
  215.                 x.join()
  216.             continue
  217.             continue
  218.  
  219.  
  220.  
  221.  
  222. class EventLoop(object):
  223.     
  224.     def __init__(self):
  225.         self.scheduler = Scheduler()
  226.         self.idleQueue = CallQueue()
  227.         self.urgentQueue = CallQueue()
  228.         self.threadPool = ThreadPool(self)
  229.         self.readCallbacks = { }
  230.         self.writeCallbacks = { }
  231.         (self.wakeSender, self.wakeReceiver) = util.makeDummySocketPair()
  232.         self.addReadCallback(self.wakeReceiver, self._slurpWakerData)
  233.         self.quitFlag = False
  234.         self.delegate = None
  235.         self.clearRemovedCallbacks()
  236.  
  237.     
  238.     def clearRemovedCallbacks(self):
  239.         self.removedReadCallbacks = set()
  240.         self.removedWriteCallbacks = set()
  241.  
  242.     
  243.     def _slurpWakerData(self):
  244.         self.wakeReceiver.recv(1024)
  245.  
  246.     
  247.     def setDelegate(self, delegate):
  248.         self.delegate = delegate
  249.  
  250.     
  251.     def addReadCallback(self, socket, callback):
  252.         self.readCallbacks[socket.fileno()] = callback
  253.  
  254.     
  255.     def removeReadCallback(self, socket):
  256.         del self.readCallbacks[socket.fileno()]
  257.         self.removedReadCallbacks.add(socket.fileno())
  258.  
  259.     
  260.     def addWriteCallback(self, socket, callback):
  261.         self.writeCallbacks[socket.fileno()] = callback
  262.  
  263.     
  264.     def removeWriteCallback(self, socket):
  265.         del self.writeCallbacks[socket.fileno()]
  266.         self.removedWriteCallbacks.add(socket.fileno())
  267.  
  268.     
  269.     def wakeup(self):
  270.         self.wakeSender.send('b')
  271.  
  272.     
  273.     def callInThread(self, callback, errback, function, name, *args, **kwargs):
  274.         self.threadPool.queueCall(callback, errback, function, name, *args, **kwargs)
  275.  
  276.     
  277.     def _beginLoop(self):
  278.         if self.delegate is not None and hasattr(self.delegate, 'beginLoop'):
  279.             self.delegate.beginLoop(self)
  280.         
  281.  
  282.     
  283.     def _endLoop(self):
  284.         if self.delegate is not None and hasattr(self.delegate, 'endLoop'):
  285.             self.delegate.endLoop(self)
  286.         
  287.  
  288.     
  289.     def loop(self):
  290.         database.set_thread()
  291.         loop_ready.set()
  292.         while not self.quitFlag:
  293.             self._beginLoop()
  294.             self.clearRemovedCallbacks()
  295.             timeout = self.scheduler.nextTimeout()
  296.             readfds = self.readCallbacks.keys()
  297.             writefds = self.writeCallbacks.keys()
  298.             
  299.             try:
  300.                 (readables, writeables, _) = select.select(readfds, writefds, [], timeout)
  301.             except select.error:
  302.                 (err, detail) = None
  303.                 if err == errno.EINTR:
  304.                     logging.warning('eventloop: %s', detail)
  305.                 else:
  306.                     raise 
  307.             except:
  308.                 err == errno.EINTR
  309.  
  310.             if self.quitFlag:
  311.                 break
  312.             
  313.             self.urgentQueue.processIdles()
  314.             for event in self.generateEvents(readables, writeables):
  315.                 event()
  316.                 if self.quitFlag:
  317.                     break
  318.                 
  319.                 self.urgentQueue.processIdles()
  320.                 if self.quitFlag:
  321.                     break
  322.                     continue
  323.             
  324.             self._endLoop()
  325.  
  326.     
  327.     def generateEvents(self, readables, writeables):
  328.         '''Generator that creates the list of events that should be dealt with
  329.         on this iteration of the event loop.  This includes all socket
  330.         read/write callbacks, timeouts and idle calls.  "events" are
  331.         implemented as functions that should be called with no arguments.
  332.         '''
  333.         for callback in self.generateCallbacks(writeables, self.writeCallbacks, self.removedWriteCallbacks):
  334.             yield callback
  335.         
  336.         for callback in self.generateCallbacks(readables, self.readCallbacks, self.removedReadCallbacks):
  337.             yield callback
  338.         
  339.         while self.scheduler.hasPendingTimeout():
  340.             yield self.scheduler.processNextTimeout
  341.         while self.idleQueue.hasPendingIdle():
  342.             yield self.idleQueue.processNextIdle
  343.  
  344.     
  345.     def generateCallbacks(self, readyList, map, removed):
  346.         for callbackEvent in readyList:
  347.             fd = None
  348.             
  349.             try:
  350.                 function = map[fd]
  351.             except KeyError:
  352.                 continue
  353.  
  354.             if fd in removed:
  355.                 continue
  356.             
  357.             when = 'While talking to the network'
  358.             yield callbackEvent
  359.             (None, None, None, None)
  360.         
  361.  
  362.  
  363. _eventLoop = EventLoop()
  364.  
  365. def setDelegate(delegate):
  366.     _eventLoop.setDelegate(delegate)
  367.  
  368.  
  369. def addReadCallback(socket, callback):
  370.     '''Add a read callback.  When socket is ready for reading, callback will
  371.     be called.  If there is already a read callback installed, it will be
  372.     replaced.
  373.     '''
  374.     _eventLoop.addReadCallback(socket, callback)
  375.  
  376.  
  377. def removeReadCallback(socket):
  378.     '''Remove a read callback.  If there is not a read callback installed for
  379.     socket, a KeyError will be thrown.
  380.     '''
  381.     _eventLoop.removeReadCallback(socket)
  382.  
  383.  
  384. def addWriteCallback(socket, callback):
  385.     '''Add a write callback.  When socket is ready for writing, callback will
  386.     be called.  If there is already a write callback installed, it will be
  387.     replaced.
  388.     '''
  389.     _eventLoop.addWriteCallback(socket, callback)
  390.  
  391.  
  392. def removeWriteCallback(socket):
  393.     '''Remove a write callback.  If there is not a write callback installed for
  394.     socket, a KeyError will be thrown.
  395.     '''
  396.     _eventLoop.removeWriteCallback(socket)
  397.  
  398.  
  399. def stopHandlingSocket(socket):
  400.     '''Convience function to that removes both the read and write callback for
  401.     a socket if they exist.'''
  402.     
  403.     try:
  404.         removeReadCallback(socket)
  405.     except KeyError:
  406.         pass
  407.  
  408.     
  409.     try:
  410.         removeWriteCallback(socket)
  411.     except KeyError:
  412.         pass
  413.  
  414.  
  415.  
  416. def addTimeout(delay, function, name, args = None, kwargs = None):
  417.     '''Schedule a function to be called at some point in the future.
  418.     Returns a DelayedCall object that can be used to cancel the call.
  419.     '''
  420.     dc = _eventLoop.scheduler.addTimeout(delay, function, name, args, kwargs)
  421.     return dc
  422.  
  423.  
  424. def addIdle(function, name, args = None, kwargs = None):
  425.     '''Schedule a function to be called when we get some spare time.
  426.     Returns a DelayedCall object that can be used to cancel the call.
  427.     '''
  428.     dc = _eventLoop.idleQueue.addIdle(function, name, args, kwargs)
  429.     _eventLoop.wakeup()
  430.     return dc
  431.  
  432.  
  433. def addUrgentCall(function, name, args = None, kwargs = None):
  434.     '''Schedule a function to be called as soon as possible.  This method
  435.     should be used for things like GUI actions, where the user is waiting on
  436.     us.
  437.     '''
  438.     dc = _eventLoop.urgentQueue.addIdle(function, name, args, kwargs)
  439.     _eventLoop.wakeup()
  440.     return dc
  441.  
  442.  
  443. def callInThread(callback, errback, function, name, *args, **kwargs):
  444.     '''Schedule a function to be called in a separate thread. Do not
  445.     put code that accesses the database or the UI here!
  446.     '''
  447.     _eventLoop.callInThread(callback, errback, function, name, *args, **kwargs)
  448.  
  449. lt = None
  450. profile_file = None
  451. loop_ready = threading.Event()
  452.  
  453. def startup():
  454.     global lt, lt
  455.     threadPoolInit()
  456.     
  457.     def profile_startup():
  458.         import profile
  459.         
  460.         def start_loop():
  461.             _eventLoop.loop()
  462.  
  463.         profile.runctx('_eventLoop.loop()', globals(), locals(), profile_file + '.event_loop')
  464.  
  465.     if profile_file:
  466.         lt = threading.Thread(target = profile_startup, name = 'Event Loop')
  467.     else:
  468.         lt = threading.Thread(target = _eventLoop.loop, name = 'Event Loop')
  469.     lt.setDaemon(False)
  470.     lt.start()
  471.     loop_ready.wait()
  472.  
  473.  
  474. def join():
  475.     if lt is not None:
  476.         lt.join()
  477.     
  478.  
  479.  
  480. def quit():
  481.     threadPoolQuit()
  482.     _eventLoop.quitFlag = True
  483.     _eventLoop.wakeup()
  484.  
  485.  
  486. def resetEventLoop():
  487.     _eventLoop = EventLoop()
  488.  
  489.  
  490. def threadPoolQuit():
  491.     _eventLoop.threadPool.closeThreads()
  492.  
  493.  
  494. def threadPoolInit():
  495.     _eventLoop.threadPool.initThreads()
  496.  
  497.  
  498. def asIdle(func):
  499.     '''Decorator to make a methods run as an idle function
  500.  
  501.     Suppose you have 2 methods, foo and bar
  502.  
  503.     @asIdle
  504.     def foo():
  505.         # database operations
  506.  
  507.     def bar():
  508.         # same database operations as foo
  509.  
  510.     Then calling foo() is exactly the same as calling addIdle(bar).
  511.     '''
  512.     
  513.     def queuer(*args, **kwargs):
  514.         return addIdle(func, '%s() (using asIdle)' % func.__name__, args = args, kwargs = kwargs)
  515.  
  516.     return queuer
  517.  
  518.  
  519. def asUrgent(func):
  520.     '''Like asIdle, but uses addUrgentCall() instead of addIdle().'''
  521.     
  522.     def queuer(*args, **kwargs):
  523.         return addUrgentCall(func, '%s() (using asUrgent)' % func.__name__, args = args, kwargs = kwargs)
  524.  
  525.     return queuer
  526.  
  527.  
  528. def checkHeapSize():
  529.     logging.info('Heap size: %d.', len(_eventLoop.scheduler.heap))
  530.     addTimeout(5, checkHeapSize, 'Check Heap Size')
  531.  
  532.